We're wasting a bunch of time waiting for our iterators to produce minibatches when we're running epochs. Seems like we should probably precompute them while the minibatch is being run on the GPU. To do this involves using the multiprocessing module. Since I've never used it before, here are my dev notes for writing this into the dataset iterators.


In [1]:
import multiprocessing
import numpy as np

In [2]:
p = multiprocessing.Pool(4)

In [3]:
x = range(3)

In [4]:
f = lambda x: x*2

In [5]:
def f(x):
    return x**2

In [6]:
print(x)


[0, 1, 2]

For some reason can't run these in the notebook. So have to run them with subprocess like so:


In [9]:
%%python
from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3]))


[1, 4, 9]

In [10]:
%%python
from multiprocessing import Pool
import numpy as np

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, np.array([1, 2, 3])))


[1, 4, 9]

Now doing this asynchronously:


In [29]:
%%python
from multiprocessing import Pool
import numpy as np

def f(x):
    return x**2

if __name__ == '__main__':
    p = Pool(5)
    r = p.map_async(f, np.array([0,1,2]))
    print(dir(r))
    print(r.get(timeout=1))


['__class__', '__delattr__', '__dict__', '__doc__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_cache', '_callback', '_chunksize', '_cond', '_job', '_number_left', '_ready', '_set', '_success', '_value', 'get', 'ready', 'successful', 'wait']
[0, 1, 4]

Now trying to create an iterable that will precompute it's output using multiprocessing.


In [36]:
%%python
from multiprocessing import Pool
import numpy as np

def f(x):
    return x**2

class It(object):
    def __init__(self,a):
        # store an array (2D)
        self.a = a
        # initialise pool
        self.p = Pool(4)
        # initialise index
        self.i = 0
        # initialise pre-computed first batch
        self.batch = self.p.map_async(f,self.a[self.i,:])
        
    def get(self):
        return self.batch.get(timeout=1)
    
    def f(self,x):
        return x**2

if __name__ == '__main__':
    it = It(np.random.randn(4,4))
    print(it.get())


[0.1025580452926798, 0.34677353277728845, 0.31839348399451078, 0.54494737993807252]

In [42]:
%%python
from multiprocessing import Pool
import numpy as np

def f(x):
    return x**2

class It(object):
    def __init__(self,a):
        # store an array (2D)
        self.a = a
        # initialise pool
        self.p = Pool(4)
        # initialise index
        self.i = 0
        # initialise pre-computed first batch
        self.batch = self.p.map_async(f,self.a[self.i,:])
        
    def __iter__(self):
        return self
    
    def next(self):
        # check if we've got something pre-computed to return
        if self.batch:
            # get the output
            output = self.batch.get(timeout=1)
            #output = self.batch
            # prepare next batch
            self.i += 1
            if self.i < self.a.shape[0]:
                self.p = Pool(4)
                self.batch = self.p.map_async(f,self.a[self.i,:])
                #self.batch = map(self.f,self.a[self.i,:])
            else:
                self.batch = False
            return output
        else:
            raise StopIteration

if __name__ == '__main__':
    it = It(np.random.randn(4,4))
    for a in it:
        print a


[0.11632354861762748, 1.5191669861450494, 0.27348868876418703, 0.010258287939590853]
[0.29734749705574992, 5.6932341844792448e-06, 2.6846188531586321, 1.1826209061013211]
[0.85475358111289024, 0.0073300344874941128, 0.1075715401038779, 0.041012729304054182]
[0.43303217578385272, 0.11516251129488242, 0.039813716356569974, 5.5441760979118655]

Then we have to try and do a similar thing, but using the randomaugment function. In the following two cells one uses multiprocessiung and one that doesn't. Testing them by pretending to ask for a minibatch and then sleep, applying the RandomAugment function each time.


In [96]:
%%time
%%python
from multiprocessing import Pool
import numpy as np
import neukrill_net.augment
import time

class It(object):
    def __init__(self,a,f):
        # store an array (2D)
        self.a = a
        # store the function
        self.f = f
        # initialise pool
        self.p = Pool(4)
        # initialise indices
        self.inds = range(self.a.shape[0])
        # pop a batch from top
        self.batch_inds = [self.inds.pop(0) for _ in range(100)]
        # initialise pre-computed first batch
        self.batch = map(self.f,self.a[self.batch_inds,:])
        
    def __iter__(self):
        return self
    
    def next(self):
        # check if we've got something pre-computed to return
        if self.inds != []:
            # get the output
            output = self.batch
            # prepare next batch
            self.batch_inds = [self.inds.pop(0) for _ in range(100)]
            self.p = Pool(4)
            self.batch = map(self.f,self.a[self.batch_inds,:])
            return output
        else:
            raise StopIteration

if __name__ == '__main__':
    f = neukrill_net.augment.RandomAugment(rotate=[0,90,180,270])
    it = It(np.random.randn(10000,48,48),f)
    for a in it:
        time.sleep(0.01)
        pass


CPU times: user 30 ms, sys: 40 ms, total: 70 ms
Wall time: 26.3 s

In [97]:
%%time
%%python
from multiprocessing import Pool
import numpy as np
import neukrill_net.augment
import time

class It(object):
    def __init__(self,a,f):
        # store an array (2D)
        self.a = a
        # store the function
        self.f = f
        # initialise pool
        self.p = Pool(8)
        # initialise indices
        self.inds = range(self.a.shape[0])
        # pop a batch from top
        self.batch_inds = [self.inds.pop(0) for _ in range(100)]
        # initialise pre-computed first batch
        self.batch = self.p.map_async(f,self.a[self.batch_inds,:])
        
    def __iter__(self):
        return self
    
    def next(self):
        # check if we've got something pre-computed to return
        if self.inds != []:
            # get the output
            output = self.batch.get(timeout=1)
            # prepare next batch
            self.batch_inds = [self.inds.pop(0) for _ in range(100)]
            #self.p = Pool(4)
            self.batch = self.p.map_async(f,self.a[self.batch_inds,:])
            return output
        else:
            raise StopIteration

if __name__ == '__main__':
    f = neukrill_net.augment.RandomAugment(rotate=[0,90,180,270])
    it = It(np.random.randn(10000,48,48),f)
    for a in it:
        time.sleep(0.01)
        pass


CPU times: user 9 ms, sys: 10 ms, total: 19 ms
Wall time: 6.55 s

In [103]:
%%time
%%python
from multiprocessing import Pool
import numpy as np
import neukrill_net.augment
import time

class It(object):
    def __init__(self,a,f):
        # store an array (2D)
        self.a = a
        # store the function
        self.f = f
        # initialise pool
        self.p = Pool(8)
        # initialise indices
        self.inds = range(self.a.shape[0])
        # pop a batch from top
        self.batch_inds = [self.inds.pop(0) for _ in range(100)]
        # initialise pre-computed first batch
        self.batch = self.p.map_async(f,self.a[self.batch_inds,:])
        
    def __iter__(self):
        return self
    
    def next(self):
        # check if we've got something pre-computed to return
        if self.inds != []:
            # get the output
            output = self.batch.get(timeout=1)
            # prepare next batch
            self.batch_inds = [self.inds.pop(0) for _ in range(100)]
            #self.p = Pool(4)
            self.batch = self.p.map_async(f,self.a[self.batch_inds,:])
            return output
        else:
            raise StopIteration

if __name__ == '__main__':
    f = neukrill_net.augment.RandomAugment(rotate=[0,90,180,270])
    it = It(np.random.randn(10000,48,48),f)
    for a in it:
        print np.array(a).shape
        print np.array(a).reshape(100,48,48,1).shape
        break


(100, 48, 48)
(100, 48, 48, 1)
CPU times: user 2 ms, sys: 11 ms, total: 13 ms
Wall time: 2.72 s

It looks like, depending on the sleep time this should be about 5 times as fast.